Skip to content

Conversation

@suryaprasanna
Copy link
Contributor

What is the purpose of the pull request

This pull request adds support for Log compaction action for MOR table types.

Brief change log

  • Support log compaction action for MOR tables

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit tests at hudi-common module to verify the correcting ordering of log blocks.
  • Added unit tests at file system API level to make sure the log compaction is supported.
  • Added unit tests at client level to verify the log compaction rewrites are working.
  • Added stress test to rigorously test the log compaction along with compaction on 2 different tables.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar vinothchandar self-assigned this Jun 28, 2022
@vinothchandar vinothchandar added the rfc Request for comments label Jun 28, 2022
@prasannarajaperumal prasannarajaperumal self-requested a review July 15, 2022 05:55
Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall. Minor comments.

@xushiyan xushiyan added big-needle-movers and removed rfc Request for comments labels Jul 30, 2022
@xushiyan xushiyan added the priority:critical Production degraded; pipelines stalled label Aug 15, 2022
Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to AbstractHoodieLogRecordReader needs a more careful review

@prasannarajaperumal prasannarajaperumal self-requested a review August 16, 2022 09:53
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am done reviewing the source code changes except AbstractHoodieLogRecordReader. will review AHLogRecordReader in a day or two.

return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}

// Rather than checking for compaction instants with completed check and a commit file,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to see if this is a mandatory fix. can we leave it as is. trying to not make any changes to core flow.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still half way through reviewing AHLogRecordReader. but prasanna raised a question somewhere, if we can filter out log blocks whose size is greater than some threshold while doing log compaction. wanted to comment on that w/o miss.

As per existing patch, guess its not doable. It will result in wrong data being served. Bcoz, the compacted log block will take higher precedence when compared to any blocks prior to that.

for eg,
LB_1, LB_2, LB_3 and LB_4(LB_1, LB_3)

LB_1, 2 and 3 are regular data log blocks, where as LB_4 is the compacted one. but it includes only LB_1 and LB_3. LB_2 is ignored due to its larger size let's say.

So, while reading records from this file slice and finding the final snapshot of all records,
we will essentialy read LB_4 and then LB_2.
But if incase we have records that was updated in both LB_1 and LB_2. ideally we want the version in LB_2 as the final one. but if we read just LB_4 and LB_2, LB_4 will win which implicitly means LB_1's version will take precedence.

So, as of now, the design assumes that any log compaction will include every log block preceding it. just wanted to remind you folks as well. Can we take another pass through entire patch to ensure this assumption holds good everywhere. if not, it could result in wrong data being served.

String compactedFinalInstantTime = blockTimeToCompactionBlockTimeMap.get(instantTime);
if (compactedFinalInstantTime == null) {
// If it is not compacted then add the blocks related to the instant time at the end of the queue and continue.
instantToBlocksMap.get(instantTime).forEach(block -> currentInstantLogBlocks.addLast(block));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use instantsBlocks

instantsBlocks.forEach(block -> currentInstantLogBlocks.addLast(block));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding L560.
when we have multiple blocks for a given instantTime, shouldn't we be adding those blocks in reverse order.
For eg,
Lets say, for C1 we have LB_1, LB_2, LB_3. and for C2, we have LB_4.

So, instantToBlocksMap will have 2 entries.
when we process C1, value is going to be LB_1, LB-2, LB_3 right? So, we can't do foreach here. we have to add LB_3, followed by LB_2, followed by LB_1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this comment now.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done w/ my review on AbstractLogRecordReader as well.
I feel the New logic is tad simpler compared to older one and is manageable/comprehensible. Older one had few corner cases here and there. good job on the entire patch @suryaprasanna 👍

@nsivabalan
Copy link
Contributor

@prasannarajaperumal : As of now, Surya has completely introduced a new scan() within AbstractLogRecordReader. It was not easy to re-use code from older one and it was getting complicated and hence. If you can take a look and provide any suggestions, would be nice on how to abstract these out. or can we go ahead w/ this duplicated(somewhat) code.

Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks @suryaprasanna . Can you please get to the comments from @nsivabalan ? We can land this in soon I hope.

@suryaprasanna
Copy link
Contributor Author

@prasannarajaperumal
My apologies, I was off until today due to some personal reasons. Will work on this PR now and get it merged.

@nsivabalan
Copy link
Contributor

hey @suryaprasanna : we also wanted to see if its feasible to not introduce a new action, but re-use the ones for compaction? can you check the feasibility on that?

Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suryaprasanna - Thanks for addressing the comments. Can you look at this specific one?
https://github.com/apache/hudi/pull/5958/files#r956601221
Couple of other minor ones looks like are not handled as well.

@nsivabalan - you mentioned that the new Scan api fixes an other unrelated issue as well. That means we roll out USE_LOG_RECORD_READER_SCAN_V2 to true with priority - correct? Dont have to tie this to log compaction rollout. Tracking this here - https://issues.apache.org/jira/browse/HUDI-4940.
Please update the jira link to the actual issue fixed (or please create one). Thanks.

Summary: Make useScanV2 value to read log blocks configurable

Reviewers: balajee, O955 Project Hoodie Project Reviewer: Add blocking reviewers, PHID-PROJ-pxfpotkfgkanblb3detq!

Reviewed By: balajee, O955 Project Hoodie Project Reviewer: Add blocking reviewers

JIRA Issues: HUDI-2398

Differential Revision: https://code.uberinternal.com/D8093885
@suryaprasanna
Copy link
Contributor Author

@prasannarajaperumal and @nsivabalan thanks a lot for reviewing the diff. I have addressed all the comments.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed most comments by myself. have 3 to 4 pending to be discussed.

final String key = keyIterator.next();
HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key);
init(record);
// For logCompaction operations all the records are read and written as a huge block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left a comment elsewhere. I am bit confused here. lets discuss this f2f.

if (isFileGroupInPendingCompaction(fileGroup)) {
fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)
&& !isFileSliceNeededForPendingLogCompaction(fs)).iterator();
if (isFileGroupInPendingCompaction(fileGroup) || isFileGroupInPendingLogCompaction(fileGroup)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I don't see this being addressed?

      if (isFileGroupInPendingCompaction(fileGroup)) {
        // We have already saved the last version of file-groups for pending compaction Id
        keepVersions--;
      }

Infact, you removed isFileGroupInPendingLogCompaction from the if condition and missed to fix the other impl.

String compactedFinalInstantTime = blockTimeToCompactionBlockTimeMap.get(instantTime);
if (compactedFinalInstantTime == null) {
// If it is not compacted then add the blocks related to the instant time at the end of the queue and continue.
instantToBlocksMap.get(instantTime).forEach(block -> currentInstantLogBlocks.addLast(block));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this addressed?

@hudi-bot
Copy link
Collaborator

hudi-bot commented Oct 8, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. good job on the patch buddy! Great job.

@nsivabalan nsivabalan merged commit 86a1efb into apache:master Oct 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

big-needle-movers priority:critical Production degraded; pipelines stalled

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

7 participants